package com.my.service.task;

import com.my.service.task.entity.EmailInfo;
import com.my.service.task.map.EmailMap;
import com.my.service.task.util.MongoUtils;
import com.my.service.task.reduce.EmailReducer;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.bson.Document;

import java.util.List;

public class EmailTask {
    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        DataSet<String> text = env.readTextFile(params.get("input"));
        DataSet<EmailInfo> mapResult = text.map(new EmailMap());
        DataSet<EmailInfo> reduceResult = mapResult.groupBy("groupfield").reduce(new EmailReducer());
        List<EmailInfo> resultList = reduceResult.collect();
        for (EmailInfo eInfo : resultList) {
            String email = eInfo.getEmailtype();
            Long count = eInfo.getCount();
            Document res = MongoUtils.findOneBy("emailstatics",
                    "realtimeportrait",
                    email
            );
            if (res == null) {
                res = new Document();
                res.put("info", email);
                res.put("count", count);
            } else {
                Long countPre = res.getLong("count");
                Long total = countPre + count;
                res.put("count", total);
            }
            MongoUtils.saveOrUpdateMongo(
                    "emailstatics",
                    "realtimeportrait",
                    res
            );
        }
        env.execute("email analysis");
    }
}
